I wrote it only as rapid illustration for method non-blocked reading from blocked stream.
/* * Author, Copyright: Oleg Borodin <onborodin@gmail.com> */ #include <stdlib.h> #include <stdio.h> #include <stdbool.h> #include <unistd.h> #include <string.h> #include <fcntl.h> #include <signal.h> #include <pthread.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include <sys/time.h> #include <err.h> void msleep(int dur) { usleep(dur * 1000); } long int timestamp() { struct timeval tp; gettimeofday(&tp, NULL); return tp.tv_sec * 1000 + tp.tv_usec / 1000; } typedef struct stream { pthread_cond_t readcond; pthread_mutex_t readmutex; pthread_t* reader_thr; pthread_t* watcher_thr; int sockfd; char* buffer; int bufcap; int readsize; long int lastread; long int timeout; int done; } stream_t; void stream_sighandler(int signo) { fprintf(stderr, "have signal, thread id %d, exit\n", (int)pthread_self()); pthread_exit(NULL); } void* stream_reader(void* argv) { struct sigaction act; memset(&act, 0, sizeof(act)); sigset_t mask; sigemptyset(&mask); sigaddset(&mask, SIGRTMIN); act.sa_handler = stream_sighandler; act.sa_mask = mask; sigaction(SIGRTMIN, &act, NULL); stream_t* s = (stream_t*)argv; s->lastread = timestamp(); printf("reader start\n"); sleep(3); strcpy(s->buffer, "hello!"); s->readsize = strlen(s->buffer); long int dur = timestamp() - s->lastread; s->done = 1; pthread_mutex_unlock(&(s->readmutex)); pthread_cond_signal(&(s->readcond)); printf("reader done after %ld us\n", dur); return NULL; } void* stream_watcher(void* argv) { stream_t* s = (stream_t*)argv; printf("watcher start\n"); while(s->done != 1) { printf("watch...\n"); long dur = (timestamp() - s->lastread); if (dur > s->timeout) { printf("read timeout!\n"); pthread_kill(*(s->reader_thr), SIGRTMIN); s->done = 1; pthread_mutex_unlock(&(s->readmutex)); pthread_cond_signal(&(s->readcond)); break; } msleep(500); } printf("watcher done\n"); return NULL; } stream_t* stream_create(int timeout) { stream_t* s = (stream_t*)malloc(sizeof(stream_t)); s->bufcap = 1024; s->buffer = (char*)malloc(s->bufcap); memset(s->buffer, 0, s->bufcap); s->reader_thr = (pthread_t*)malloc(sizeof(pthread_t)); memset(s->reader_thr, 0, sizeof(pthread_t)); s->watcher_thr = (pthread_t*)malloc(sizeof(pthread_t)); memset(s->watcher_thr, 0, sizeof(pthread_t)); s->sockfd = 0; s->readsize = 0; s->done = 0; s->timeout = timeout; return s; } void stream_close(stream_t* s) { free(s->buffer); free(s->reader_thr); free(s->watcher_thr); free(s); } int stream_recv(stream_t* s) { printf("read thread start\n"); pthread_mutex_lock(&(s->readmutex)); pthread_create(s->reader_thr, NULL, stream_reader, (void*)s); pthread_create(s->watcher_thr, NULL, stream_watcher, (void*)s); pthread_detach(*(s->reader_thr)); pthread_detach(*(s->watcher_thr)); while(s->done != 1) { pthread_cond_wait(&(s->readcond), &(s->readmutex)); printf("while...\n"); } printf("read thread done\n"); return s->readsize; } int main(int argc, char **argv) { stream_t* s = NULL; int timeout = 0; int rv = 0; timeout = 2000; printf("create stream with timeout = %d ms\n", timeout); s = stream_create(timeout); if ((rv = stream_recv(s)) > 0) { char str[] = ""; strncat(str, s->buffer, rv); printf("stream buffer: %s\n", str); } else { printf("stream buffer empty =(\n"); } stream_close(s); timeout = 5000; printf("create stream with timeout = %d ms\n", timeout); s = stream_create(timeout); if ((rv = stream_recv(s)) > 0) { char str[] = ""; strncat(str, s->buffer, rv); printf("stream buffer: %s\n", str); } else { printf("stream buffer empty =(\n"); } stream_close(s); return 0; }
$ ./thr-reader create stream with timeout = 2000 ms read thread start watcher start watch... reader start watch... watch... watch... watch... read timeout! watcher done while... read thread done stream buffer empty =( create stream with timeout = 5000 ms read thread start have signal, thread id 27354368, exit reader start watcher start watch... watch... watch... watch... watch... watch... reader done after 3006 us while... read thread done stream buffer: hello! $